computation progress display
authorJoey Hess <joeyh@joeyh.name>
Wed, 5 Mar 2025 17:46:06 +0000 (13:46 -0400)
committerJoey Hess <joeyh@joeyh.name>
Wed, 5 Mar 2025 17:46:06 +0000 (13:46 -0400)
Command/AddComputed.hs
Command/Recompute.hs
Remote/Compute.hs
TODO-compute

index 226f2c0c082b7bb1e2ed1931317e6b2ebee76354..f05f3bdfcd0c90e2651500520c3af8d81e04295b 100644 (file)
@@ -101,6 +101,7 @@ perform o r = do
        Remote.Compute.runComputeProgram program state
                (Remote.Compute.ImmutableState False)
                (getInputContent fast)
+               Nothing
                (addComputed "adding" True r (reproducible o) chooseBackend Just fast)
        next $ return True
 
index 2eda09886771c202bdbb6aa699a0bfa2963d2cc9..8233bc87e730486bf3ab1d6ff3a86e50bd999716 100644 (file)
@@ -129,6 +129,7 @@ perform o r file origkey origstate = do
        Remote.Compute.runComputeProgram program origstate
                (Remote.Compute.ImmutableState False)
                (getinputcontent program)
+               Nothing
                (go program reproducibleconfig)
        next $ return True
   where
index 8c06dd9061159b0943b831a6be90c3c01cda5b24..8b64dee56ed7f4a6e69b7b5eadafa73913ae9751 100644 (file)
@@ -44,6 +44,7 @@ import qualified Annex.Transfer
 import Logs.MetaData
 import Logs.EquivilantKeys
 import Logs.Location
+import Messages.Progress
 import Utility.Metered
 import Utility.TimeStamp
 import Utility.Env
@@ -59,6 +60,8 @@ import qualified Utility.SimpleProtocol as Proto
 import Network.HTTP.Types.URI
 import Data.Time.Clock
 import Text.Read
+import Control.Concurrent.STM
+import Control.Concurrent.Async
 import qualified Data.Map as M
 import qualified Data.Set as S
 import qualified Data.ByteString as B
@@ -209,8 +212,10 @@ newtype PercentFloat = PercentFloat Float
        deriving (Show, Eq)
 
 instance Proto.Serializable PercentFloat where
-       serialize (PercentFloat p) = show p
-       deserialize s = PercentFloat <$> readMaybe s
+       serialize (PercentFloat p) = show p ++ "%"
+       deserialize s = do
+               s' <- reverse <$> stripPrefix "%" (reverse s)
+               PercentFloat <$> readMaybe s'
 
 data ComputeState = ComputeState
        { computeParams :: [String]
@@ -374,9 +379,11 @@ runComputeProgram
        -> (OsPath -> Annex (Key, Maybe (Either Git.Sha OsPath)))
        -- ^ get input file's content, or Nothing the input file's
        -- content is not available
+       -> Maybe (Key, MeterUpdate)
+       -- ^ update meter for this key
        -> (ComputeState -> OsPath -> NominalDiffTime -> Annex v)
        -> Annex v
-runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent cont =
+runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent meterkey cont =
        withOtherTmp $ \othertmpdir ->
                withTmpDirIn othertmpdir (literalOsPath "compute") go
   where
@@ -391,10 +398,10 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                         }
                showOutput
                starttime <- liftIO currentMonotonicTimestamp
-               state' <- bracket
+               state' <- withmeterfile $ \meterfile -> bracket
                        (liftIO $ createProcess pr)
                        (liftIO . cleanupProcess)
-                       (getinput state tmpdir subdir)
+                       (getinput tmpdir subdir state meterfile)
                endtime <- liftIO currentMonotonicTimestamp
                cont state' subdir (calcduration starttime endtime)
                
@@ -408,13 +415,13 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                        , return tmpdir
                        )
        
-       getinput state' tmpdir subdir p = 
+       getinput tmpdir subdir state' meterfile p = 
                liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case
                        Just l
-                               | null l -> getinput state' tmpdir subdir p
+                               | null l -> getinput tmpdir subdir state' meterfile p
                                | otherwise -> do
-                                       state'' <- parseoutput p tmpdir subdir state' l
-                                       getinput state'' tmpdir subdir p
+                                       state'' <- parseoutput p tmpdir subdir state' meterfile l
+                                       getinput tmpdir subdir state'' meterfile p
                        Nothing -> do
                                liftIO $ hClose (stdoutHandle p)
                                liftIO $ hClose (stdinHandle p)
@@ -422,7 +429,7 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                                        giveup $ program ++ " exited unsuccessfully"
                                return state'
        
-       parseoutput p tmpdir subdir state' l = case Proto.parseMessage l of
+       parseoutput p tmpdir subdir state' meterfile l = case Proto.parseMessage l of
                Just (ProcessInput f) -> do
                        let f' = toOsPath f
                        let knowninput = M.member f' (computeInputs state')
@@ -453,7 +460,12 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                Just (ProcessOutput f) -> do
                        let f' = toOsPath f
                        checksafefile tmpdir subdir f' "output"
-                       let knownoutput = M.member f' (computeOutputs state')
+                       knownoutput <- case M.lookup f' (computeOutputs state') of
+                               Nothing -> return False
+                               Just mk -> do
+                                       when (mk == fmap fst meterkey) $
+                                               meterfile (subdir </> f')
+                                       return True
                        checkimmutable knownoutput "outputting" f' $ 
                                return $ if immutablestate
                                        then state'
@@ -463,12 +475,12 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                                                                (computeOutputs state')
                                                }
                Just (ProcessProgress percent) -> do
-                       -- XXX
+                       liftIO $ updatepercent percent
                        return state'
                Just ProcessReproducible ->
                        return $ state' { computeReproducible = True }
                Nothing -> giveup $
-                       program ++ " output included an unparseable line: \"" ++ l ++ "\""
+                       program ++ " output an unparseable line: \"" ++ l ++ "\""
 
        checksafefile tmpdir subdir f fileaction = do
                let err problem = giveup $
@@ -497,26 +509,57 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                liftIO . F.writeFile f =<< catObject gitsha
                return f
 
+       withmeterfile a = case meterkey of
+               Nothing -> a (const noop)
+               Just (_, progress) -> do
+                       filev <- liftIO newEmptyTMVarIO
+                       endv <- liftIO $ newEmptyTMVarIO
+                       let meterfile = void . liftIO 
+                               . atomically . tryPutTMVar filev
+                       let endmeterfile = atomically $ putTMVar endv ()
+                       tid <- liftIO $ async $ do
+                               v <- liftIO $ atomically $ 
+                                       (Right <$> takeTMVar filev)
+                                               `orElse`
+                                       (Left <$> takeTMVar endv)
+                               case v of
+                                       Right f -> watchFileSize f progress $ \_ ->
+                                               void $ liftIO $ atomically $
+                                                       takeTMVar endv
+                                       Left () -> return ()
+                       a meterfile 
+                               `finally` liftIO (endmeterfile >> wait tid)
+                       
+       updatepercent (PercentFloat percent) = case meterkey of
+               Nothing -> noop
+               Just (k, progress) -> case fromKey keySize k of
+                       Nothing -> noop
+                       Just sz ->
+                               progress $ BytesProcessed $ floor $ 
+                                       fromIntegral sz * percent / 100
+
 computationBehaviorChangeError :: ComputeProgram -> String -> OsPath -> Annex a
 computationBehaviorChangeError (ComputeProgram program) requestdesc p =
        giveup $ program ++ " is not behaving the same way it used to, now " ++ requestdesc ++ ": " ++ fromOsPath p
 
 computeKey :: RemoteStateHandle -> ComputeProgram -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification
-computeKey rs (ComputeProgram program) k _af dest p vc =
+computeKey rs (ComputeProgram program) k _af dest meterupdate vc =
        getComputeState rs k >>= \case
-               Just state -> 
+               Just state ->
                        case computeskey state of
-                               Just keyfile -> runComputeProgram
-                                       (ComputeProgram program)
-                                       state
-                                       (ImmutableState True)
-                                       (getinputcontent state)
-                                       (postcompute keyfile)
+                               Just keyfile -> go state keyfile
                                Nothing -> missingstate
                Nothing -> missingstate
   where
        missingstate = giveup "Missing compute state"
 
+       go state keyfile = metered (Just meterupdate) k Nothing $ \_ p ->
+               runComputeProgram (ComputeProgram program) state
+                       (ImmutableState True)
+                       (getinputcontent state)
+                       (Just (k, p))
+                       (postcompute keyfile)
+
        getinputcontent state f =
                case M.lookup f (computeInputs state) of
                        Just inputkey -> case keyGitSha inputkey of
index 09cc8538985cc670159df8e47715907ee0eebe9c..5b212695c6e7e5c375e7ea6445e3d08a439f08e4 100644 (file)
@@ -11,8 +11,6 @@
 * allow git-annex enableremote with program= explicitly specified,
   without checking annex.security.allowed-compute-programs
 
-* need progress bars for computations and implement PROGRESS message
-
 * addcomputed should honor annex.addunlocked.
 
 * Perhaps recompute should write a new version of a file as an unlocked